跳到主要内容

Go 的 RPC 包学习

RPC 库

最近在看 RPC 相关的库的时候发现 Golang 标准库提供了一个 RPC 库 net/rpc 服务器注册一个对象,使其作为具有对象类型名称的服务可见。注册后,可以远程访问对象的导出方法。

能被远程访问的方法:

  • 这个函数必须是公开的
  • 这个函数返回的类型是公开的
  • 这个函数必须有两个参数,两个导出类型
  • 这个方法的第二个参数得是一个指针
  • 这个方法返回错误类型

例如:

func (t *T) MethodName(argType T1, replyType *T2) error

快速使用 HTTP

由于是网络程序,我们需要编写服务端和客户端两个程序。

RPC 服务端

首先是服务端程序:

如下所示,该 RPC 暴露出两个函数 Multiply 和 Divide

package main

import (
"errors"
"log"
"net"
"net/http"
"net/rpc"
)

type Args struct {
A, B int
}

type Quotient struct {
Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by 0")
}

quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}

func main() {
arith := new(Arith)
rpc.Register(arith)

// 注册 HTTP 路由
rpc.HandleHTTP()
if err := http.ListenAndServe(":1234", nil); err != nil {
log.Fatal("serve error:", err)
}
}

定义了一个 Arith 类型,为它编写了两个方法 Multiply 和 Divide。 创建 Arith 类型的对象 arith,调用 rpc.Register(arith) 会注册这两个方法。

注意,上面也提到了 rpc 库对注册的方法有一定的限制,方法必须满足签名

func (t *T) MethodName(argType T1, replyType *T2) error

这里的 rpc.HandleHTTP() 注册 HTTP 路由。http.ListenAndServe(":1234", nil) 在端口1234上启动一个 HTTP 服务,请求 rpc 方法会交给 rpc 内部路由处理。

RPC 客户端

package main

import (
"fmt"
"log"
"net/rpc"
)

type Args struct {
A, B int
}

type Quotient struct {
Quo, Rem int
}

func main() {
client, err := rpc.DialHTTP("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}

args := &Args{7, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("Multiply error:", err)
}
fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)


args = &Args{15, 6}
var quo Quotient
err = client.Call("Arith.Divide", args, &quo)
if err != nil {
log.Fatal("Divide error:", err)
}
fmt.Printf("Divide: %d/%d=%d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}

rpc 和 http 是怎么联系的?

上面的最后的启动 RPC 这块,这两个包是怎么联系的呢?

// 注册 HTTP 路由
rpc.HandleHTTP()
if err := http.ListenAndServe(":1234", nil); err != nil {
log.Fatal("serve error:", err)
}

看它们的源码

// src/net/rpc/server.go
const (
// Defaults used by HandleHTTP
DefaultRPCPath = "/_goRPC_"
DefaultDebugPath = "/debug/rpc"
)

func (server *Server) HandleHTTP(rpcPath, debugPath string) {
http.Handle(rpcPath, server)
http.Handle(debugPath, debugHTTP{server})
}

func HandleHTTP() {
DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}

rpc.HandleHTTP() 会调用 http.Handle() 在预定义的路径上(/_goRPC_)注册处理器。这个处理器最终被添加到 net/http 包中的默认多路复用器上:

// src/net/http/server.go
func Handle(pattern string, handler Handler) {
DefaultServeMux.Handle(pattern, handler)
}

http.ListenAndServer() 第二个参数传入 nil 时也是使用默认的多路复用器。

除了默认的路径 /_goRPC_ 用来处理 RPC 请求,rpc.HandleHTTP() 方法还注册了一个调试路径 /debug/rpc。可以直接在浏览器中访问这个网址

http://localhost:1234/debug/rpc

要先启动服务端服务,就可以看到这些服务被请求的次数(可以结合客户端多次测试)

客户端异步调用

上面的例子中,在客户端使用了同步的调用方式,即一直等待服务端的响应或出错。在等待的过程中,客户端就不能处理其它的任务了,可以将其改成异步的调用方式:

package main

import (
"fmt"
"log"
"net/rpc"
"time"
)

type Args struct {
A, B int
}

type Quotient struct {
Quo, Rem int
}

func main() {
client, err := rpc.DialHTTP("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}

args1 := &Args{7, 8}
var reply int
multiplyReply := client.Go("Arith.Multiply", args1, &reply, nil)

args2 := &Args{15, 6}
var quo Quotient
divideReply := client.Go("Arith.Divide", args2, &quo, nil)

ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()

var multiplyReplied, divideReplied bool

for !multiplyReplied || !divideReplied {
select {

case replyCall := <-multiplyReply.Done:
if err := replyCall.Error; err != nil {
fmt.Println("Multiply error:", err)
} else {
fmt.Printf("Multiply: %d*%d=%d\n", args1.A, args1.B, reply)
}
multiplyReplied = true

case replyCall := <-divideReply.Done:
if err := replyCall.Error; err != nil {
fmt.Println("Divide error:", err)
} else {
fmt.Printf("Divide: %d/%d=%d...%d\n", args2.A, args2.B, quo.Quo, quo.Rem)
}
divideReplied = true

case <-ticker.C:
fmt.Println("tick")
}
}
}

异步调用使用 client.Go() 方法,参数与同步调用基本一样。它返回一个 rpc.Call 对象:

// src/net/rpc/client.go
type Call struct {
ServiceMethod string
Args interface{}
Reply interface{}
Error error
Done chan *Call
}

通过监听通道 Done 是否有值判断调用是否完成。上面代码中使用一个 select 语句轮询两次调用的状态。注意一点,如果多个通道都有值,select 执行哪个 case 是随机的。所以可能先输出 divide 的信息

这里补充一个工具类,NewTicker 会返回一个 Channel,每隔一段时间返回一个 Time 对象到这个 Channel 里面

ticker := time.NewTicker(time.Millisecond)

select {
case <-ticker.C:
fmt.Println("tick")
}

定制方法名

默认情况下,rpc.Register() 将方法接收者(receiver)的类型名作为方法名前缀。我们也可以自己设置。这时需要调用 RegisterName(name string, rcvr interface{}) error 方法:

func main() {
arith := new(Arith)
rpc.RegisterName("math", arith)

rpc.HandleHTTP()
if err := http.ListenAndServe(":1234", nil); err != nil {
log.Fatal("serve error:", err)
}
}

上面将注册的方法名前缀改为 math 了,客户端调用时传入的方法名也需要相应的修改:

func main() {
client, err := rpc.DialHTTP("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}

args := &Args{7, 8}
var reply int
err = client.Call("math.Multiply", args, &reply)
if err != nil {
log.Fatal("Multiply error:", err)
}
fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

使用 TCP 来实现 RPC

上面都是使用 HTTP 协议来实现 rpc 服务的,rpc库也支持直接使用 TCP 协议。首先,服务端先调用 net.Listen("tcp", ":1234") 创建一个监听某个 TCP 端口的监听器(Accepter),然后使用 rpc.Accept(l) 在此监听器上接受连接并处理:

func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("listen error:", err)
}

arith := new(Arith)
rpc.Register(arith)
rpc.Accept(listener)
}

然后,客户端调用 rpc.Dial() 以 TCP 协议连接到服务端:

func main() {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}

args := &Args{7, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("Multiply error:", err)
}

fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

自定义编码格式

默认客户端与服务端之间的数据使用 gob 编码,我们可以使用其它的格式来编码。在服务端,我们要实现 rpc.ServerCodec 接口:

// src/net/rpc/server.go
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error

Close() error
}

实现了一个 JSON 格式的编解码器:

type JsonServerCodec struct {
rwc io.ReadWriteCloser
dec *json.Decoder
enc *json.Encoder
encBuf *bufio.Writer
closed bool
}

func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec {
buf := bufio.NewWriter(conn)
return &JsonServerCodec{
conn,
json.NewDecoder(conn),
json.NewEncoder(buf),
buf,
false}
}

func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error {
return c.dec.Decode(r)
}

func (c *JsonServerCodec) ReadRequestBody(body interface{}) error {
return c.dec.Decode(body)
}

func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
if err = c.enc.Encode(r); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: json error encoding response:", err)
c.Close()
}
return
}

if err = c.enc.Encode(body); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: json error encoding body:", err)
c.Close()
}
return
}
// 刷写到 Response 里面
return c.encBuf.Flush()
}

func (c *JsonServerCodec) Close() error {
if c.closed {
return nil
}
c.closed = true
return c.rwc.Close()
}

func main() {
l, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("listen error:", err)
}

arith := new(Arith)
rpc.Register(arith)

for {
conn, err := l.Accept()
if err != nil {
log.Fatal("accept error:", err)
}

go rpc.ServeCodec(NewJsonServerCodec(conn))
}
}

在 for 循环中需要创建编解码器 JsonServerCodec 传给 ServeCodec 方法。

同样的,客户端要实现 rpc.ClientCodec 接口:

type JsonClientCodec struct {
rwc io.ReadWriteCloser
dec *json.Decoder
enc *json.Encoder
encBuf *bufio.Writer
}

func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec {
encBuf := bufio.NewWriter(conn)
return &JsonClientCodec{
conn,
json.NewDecoder(conn),
json.NewEncoder(encBuf),
encBuf}
}

func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {
if err = c.enc.Encode(r); err != nil {
return
}
if err = c.enc.Encode(body); err != nil {
return
}
return c.encBuf.Flush()
}

func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error {
return c.dec.Decode(r)
}

func (c *JsonClientCodec) ReadResponseBody(body interface{}) error {
return c.dec.Decode(body)
}

func (c *JsonClientCodec) Close() error {
return c.rwc.Close()
}

func main() {
conn, err := net.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dial error:", err)
}

client := rpc.NewClientWithCodec(NewJsonClientCodec(conn))

args := &Args{7, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("Multiply error:", err)
}
fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

要使用 NewClientWithCodec 以指定的编解码器创建客户端。

Reference

Documentation Go 每日一库之 rpc